
#include "appinc.h"
#include "ssiot_api.h"
#include "ssiot_internal.h"


static ssiot_t *g_ssiot;


static int ssiot_param_init(ssiot_param_t *sparam, mqtt_param_t *mparam)
{
	char *str;

	if (sparam->server == NULL)
		return -EINVAL;

	mparam->keepalive = 300;
	mparam->cleansession = true;
	mparam->ssl_enable = false;
	mparam->port = 0;

	if (!strncmp(sparam->server, "mqtts://", 8)) {
		mparam->ssl_enable = true;
		strlcpy(mparam->hostname, sparam->server + 8, sizeof(mparam->hostname));
	} else if (!strncmp(sparam->server, "tls://", 6)) {
		mparam->ssl_enable = true;
		strlcpy(mparam->hostname, sparam->server + 6, sizeof(mparam->hostname));
	} else if (!strncmp(sparam->server, "mqtt://", 7)) {
		strlcpy(mparam->hostname, sparam->server + 7, sizeof(mparam->hostname));
	} else if (!strncmp(sparam->server, "tcp://", 6)) {
		strlcpy(mparam->hostname, sparam->server + 6, sizeof(mparam->hostname));
	} else {
		strlcpy(mparam->hostname, sparam->server, sizeof(mparam->hostname));
	}
	str = strstr(mparam->hostname, ":");
	if (str) {
		*str++ = '\0';
		mparam->port = atoi(str);
	}

	if (mparam->hostname[0] == '\0' || mparam->port <= 0 || mparam->port >= 65536)
		return -EINVAL;

	if (sparam->username)
		strlcpy(mparam->username, sparam->username, sizeof(mparam->username));
	if (sparam->password)
		strlcpy(mparam->password, sparam->password, sizeof(mparam->password));

	return 0;
}


static int ssiot_connect(ssiot_t *ssiot)
{
	int ret;

	if (ssiot->mqtt.connected > 0)
		return 1;
	if (ssiot->mqtt.connected < 0)
		hmqtt_disconnect(&ssiot->mqtt, NULL);

	snprintf(ssiot->param.clientid, sizeof(ssiot->param.clientid), "%s%lu", ssiot->devsn, libsys_get_uptime_ms());

	if (ssiot->on_goto) {
		mqtt_param_t mparam;
		ssiot_param_t sparam;
		memcpy(&mparam, &ssiot->param, sizeof(mparam));
		memset(&sparam, 0, sizeof(sparam));
		sparam.server = ssiot->goto_server;
		ret = ssiot_param_init(&sparam, &mparam);
		if (ret == 0)
			ret = hmqtt_connect(&ssiot->mqtt, &mparam);
	} else {
		ret = hmqtt_connect(&ssiot->mqtt, &ssiot->param);
	}
	if (ret >= 0) {
		ssiot->connect_time = libsys_get_uptime();
		ssiot->retry = 0;
		return 1;
	}
	ssiot->retry++;
	LOG_W("MQTT connect fail %d times", ssiot->retry);

	return ret;
}


static int ssiot_subscribe(ssiot_t *ssiot, bool force)
{
	char topic[64];
	int ret;

	if (ssiot->mqtt.subscribed > 0)
		return 1;

	snprintf(topic, sizeof(topic), "/iot/%s/#", ssiot->devsn);
	ret = hmqtt_subscribe(&ssiot->mqtt, topic, 0);
	if (ret < 0 && force == false) {
		hmqtt_disconnect(&ssiot->mqtt, "subscribe fail");
		return ret;
	}
	LOG_I("%s: subscribe %s success", ssiot->mqtt.name, topic);

	return 1;
}



static int login_receive(ssiot_t *ssiot, ssiot_message_t *msg)
{
	cJSON *sub = NULL;
	int ret;
	ssiot_param_t sparam;
	mqtt_param_t mparam;

	if (msg->sid != ssiot->sid_s)
		return 0;

	ssiot->logined = true;
	ssiot->mqtt.wakeup = true;
	if (msg->data)
		sub = cJSON_GetObjectItem(msg->data, "goto");

	if (sub == NULL || sub->valuestring == NULL || sub->valuestring[0] =='\0')
		return 0;

	if (ssiot->goto_server) {
		if (ssiot->on_goto) {
			LOG_W("current is no goto(%s)", ssiot->goto_server);
			return 0;
		}
		if (!strcmp(ssiot->goto_server, sub->valuestring))
			return 0;
	}
	memset(&sparam, 0, sizeof(sparam));
	sparam.server = sub->valuestring;
	ret = ssiot_param_init(&sparam, &mparam);
	if (ret < 0) {
		LOG_W("goto addr(%s) invalid", sub->valuestring);
		return 0;
	}
	if (!strcmp(mparam.hostname, ssiot->param.hostname) && mparam.port == ssiot->param.port) {
		LOG_I("goto the same server, ignore goto");
		return 0;
	}
	if (ssiot->goto_server)
		free(ssiot->goto_server);
	ssiot->goto_server = strdup(sub->valuestring);

	return 0;
}



static int ssiot_login(ssiot_t *ssiot)
{
	char topic[64], buf[256];
	int ret;

	if (ssiot->on_goto && ssiot->retry >= 2) {
		hmqtt_disconnect(&ssiot->mqtt, "login fail");
		return -1;
	}

	if (ssiot->retry >= 3) {
		LOG_I("%s: stop publish login", ssiot->mqtt.name);
		ssiot->retry = 0;
		ssiot->logined = true;
	}
	if (ssiot->logined)
		return 1;
	
	if (++ssiot->sid_s == 0)
		ssiot->sid_s = libsys_get_uptime();

	snprintf(topic, sizeof(topic), "/iot/$/%s/login", ssiot->devsn);
	if (ssiot->key)
		snprintf(buf, sizeof(buf), TOSTR({"sid":%u,"data":{"model":"%s", "key":"%s"}}), ssiot->sid_s, ssiot->model, ssiot->key);
	else
		snprintf(buf, sizeof(buf), TOSTR({"sid":%u,"data":{"model":"%s"}}), ssiot->sid_s, ssiot->model);
	ret = hmqtt_publish_qos0(&ssiot->mqtt, topic, buf, strlen(buf));
	if (ret < 0)
		return ret;
	ssiot->retry++;
	ret = hmqtt_loop(&ssiot->mqtt, ssiot->retry*2000);
	if (ret < 0)
		return ret;

	LOG_I("%s: login success", ssiot->mqtt.name);

	return 0;
}



static int ssiot_connect_process(ssiot_t *ssiot)
{
	int ret, wait;
	uint64_t now;

	if (ssiot->on_goto && ssiot->fails) {
		//goto切换失败
		const static int times[] = {60, 300, 600, 1800, 3600, 86400};
		uint32_t fails = ssiot->goto_fails++;
		LOG_I("%s: goto fails:%d", ssiot->mqtt.name, ssiot->goto_fails);
		if (fails >= ARRAY_SIZE(times))
			fails = ARRAY_SIZE(times) - 1;
		ssiot->goto_time = libsys_get_uptime() + times[fails];
		//立即切换到原配置服务器
		ssiot->next_time = 0;
		//释放goto资源
		ssiot->on_goto = false;
		free(ssiot->goto_server);
		ssiot->goto_server = NULL;
	}
	now = libsys_get_uptime_ms();
	if (now < ssiot->next_time) {
		wait = ssiot->next_time - now;
		libnotify_wait(&ssiot->notify, wait);
		return 0;
	}
	ret = ssiot_connect(ssiot);
	if (ret > 0) {
		//登录前订阅一次，登录成功后再订阅一次，增加兼容性
		ssiot_subscribe(ssiot, true);
		ret = ssiot_login(ssiot);
		if (ret > 0)
			ret = ssiot_subscribe(ssiot, false);
	}

	if (ret < 0) {
		LOG_I("%s: connect failed", ssiot->mqtt.name);
		ssiot->fails++;
		ssiot->logined = false;
		wait = ssiot->fails*10;
		//重连间隔最大30分钟
		if (wait > 1800)
			wait = 1800;
		ssiot->next_time = libsys_get_uptime_ms() + wait*1000;
	}
	if (ret <= 0)
		return ret;
	LOG_I("%s: connect success", ssiot->mqtt.name);

	//登录完成后检查是否需要转到另一个服务器
	if (ssiot->goto_server && ssiot->on_goto == false && libsys_get_uptime() >= ssiot->goto_time) {
		LOG_I("%s: goto %s", ssiot->mqtt.name, ssiot->goto_server);
		ssiot->fails = 0;
		ssiot->retry = 0;
		ssiot->logined = false;
		ssiot->on_goto = true;
		hmqtt_disconnect(&ssiot->mqtt, "goto next");
		return 0;
	}

	return 1;
}


static int ssiot_loop_work(ssiot_t *ssiot)
{
	int ret = 0, timeout, wait;
	uint32_t now;
	ssiot->uptimes++;
	ssiot->online = libsys_get_uptime();

	while (1) {
		now = libsys_get_uptime();

		timeout = 0;
		if (ssiot->goto_server && ssiot->on_goto == false) {
			if (now >= ssiot->goto_time) {
				LOG_I("%s: goto %s", ssiot->mqtt.name, ssiot->goto_server);
				ssiot->on_goto = true;
				hmqtt_disconnect(&ssiot->mqtt, "goto next");
				break;
			} else {
				timeout = ssiot->goto_time - now;
			}
		}
		ret = hmqtt_loop(&ssiot->mqtt, timeout*1000);
		if (ret < 0 || ssiot->mqtt.connected <= 0){
			LOG_I("%s: mqtt disconnected and need to reconnected", ssiot->mqtt.name);
			break;
		}
		if (ssiot->on_goto && ssiot->goto_fails)
			ssiot->goto_fails = 0;
	}

	now = libsys_get_uptime();
	ssiot->offline = now;
	if (now - ssiot->connect_time >= 10) {
		ssiot->fails = 0;
	} else {
		ssiot->fails++;
		wait = ssiot->fails*10;
		//重连间隔最大30分钟
		if (wait > 1800)
			wait = 1800;
		ssiot->next_time = now + wait;
	}

	return ret;
}


static int ssiot_response(ssiot_t *ssiot, ssiot_message_t *msg, const char *topic)
{
	char topic_s[128], *send;
	cJSON *resp;
	int ret;

	resp = cJSON_CreateObject();
	if (resp == NULL)
		return -ENOMEM;

	if (msg->sid)
		cJSON_AddNumberToObject(resp, "sid", msg->sid);
	if (msg->to)
		cJSON_AddStringToObject(resp, "from", msg->to);

	if (msg->resp_data)
		cJSON_AddItemToObject(resp, "data", msg->resp_data);
	if (msg->resp_code)
		cJSON_AddNumberToObject(resp, "code", msg->resp_code);
	if (msg->resp_msg) {
		cJSON_AddStringToObject(resp, "msg", msg->resp_msg);
		if (msg->free_msg) {
			msg->free_msg = false;
			free(msg->resp_msg);
			msg->resp_msg = NULL;
		}
	}
	
	send = cJSON_PrintUnformatted(resp);
	cJSON_Delete(resp);
	if (send == NULL)
		return -ENOMEM;

	snprintf(topic_s, sizeof(topic_s), "/iot/%s/%s/%s/ack", msg->src, ssiot->devsn, topic);
	ret = hmqtt_publish_qos0(&ssiot->mqtt, topic_s, send, strlen(send));
	free(send);

	return ret;
}


static void on_ssiot_receive(mqtt_message_t *message, void *arg)
{
	ssiot_t *ssiot = (ssiot_t *)arg;
	ssiot_message_t msg;
	int topic_len, len;
	char *topic;
	char topic_n[128] = {0};
	cJSON *json, *json_s;
	
	if (message->topic.cstring) {
		topic = message->topic.cstring;
		topic_len = strlen(topic_n);
	} else if (message->topic.lenstring.data) {
		topic = message->topic.lenstring.data;
		topic_len = message->topic.lenstring.len;
	} else {
		return;
	}
	//对比前缀
	len = snprintf(topic_n, sizeof(topic_n), "/iot/%s/", ssiot->devsn);
	if (topic_len <= len || strncmp(topic, topic_n, len))
		return;
	topic_len -= len;
	topic += len;
	if (topic_len > sizeof(topic_n)-1)
		topic_len = sizeof(topic_n)-1;
	strlcpy(topic_n, topic, topic_len+1);
	LOG_D("%s recive: %s(%d)", ssiot->mqtt.name, topic_n, message->payloadlen);
	//此时topic_n指向源地址
	topic = strchr(topic_n, '/');
	if (topic == NULL)
		return;
	*topic++ = '\0';

	memset(&msg, 0, sizeof(msg));
	msg.src = topic_n;
	msg.payload = message->payload;
	msg.payloadlen = message->payloadlen;
	msg.topic = topic;
	msg.type = SSIOT_MSG_PAYLOAD;
	
	//stream不解析json
	//可添加其它不需要解析成JSON的自定义主题
	if (!libstr_cmp_prefix(msg.topic, "stream/")) {
		json = NULL;
	} else {
		json = cJSON_Parse((const char *)message->payload);
	}
	if (json) {
		msg.type = SSIOT_MSG_JSON;
		LOG_D("%s recive data: %s", ssiot->mqtt.name, message->payload);
		json_s = cJSON_GetObjectItem(json, "sid");
		if (json_s && json_s->type == cJSON_Number)
			msg.sid = json_s->valueint;
		json_s = cJSON_GetObjectItem(json, "to");
		if (json_s && json_s->valuestring && json_s->valuestring[0]) {
			//忽略to指向自己的情况
			if (strcmp(json_s->valuestring, ssiot->devsn))
				msg.to = json_s->valuestring;
		}
		msg.data = cJSON_GetObjectItem(json, "data");
	}
	
	if (!strcmp(topic, "login/ack") && msg.to == NULL)
		login_receive(ssiot, &msg);
	else if (ssiot->on_message)
		ssiot->on_message(&msg, ssiot->msg_arg);

	if (msg.resp_data || msg.resp_code)
		ssiot_response(ssiot, &msg, topic);
	if (json)
		cJSON_Delete(json);
}


static void sstio_set_status(ssiot_t *ssiot, bool connected)
{
	ssiot->connected = connected;

	if (ssiot->on_message) {
		ssiot_message_t msg;
		memset(&msg, 0, sizeof(msg));
		if (connected)
			msg.type = SSIOT_MSG_CONNECTED;
		else
			msg.type = SSIOT_MSG_DISCONNECTED;
		ssiot->on_message(&msg, ssiot->msg_arg);
	}
}

static void *mqtt_thread_main(void *arg)
{
	ssiot_t *ssiot = (ssiot_t *)arg;
	int ret;

	LOG_I("%s: mqtt thread start", ssiot->mqtt.name);
	while (ssiot->running) {
		ret = ssiot_connect_process(ssiot);
		if (ret <= 0)
			continue;
		
		sstio_set_status(ssiot, true);
		libnotify_wakeup(&ssiot->notify);
		ssiot_loop_work(ssiot);
		hmqtt_disconnect(&ssiot->mqtt, "unknown");
		sstio_set_status(ssiot, false);
		ssiot->logined = false;
	}
	LOG_I("%s: mqtt thread exit", ssiot->mqtt.name);
	return NULL;
}

static void *data_thread_main(void *arg)
{
	ssiot_t *ssiot = (ssiot_t *)arg;
	ssiot_pkg_t *pkg;
	int ret;

	LOG_I("%s: data thread start", ssiot->mqtt.name);
	while (ssiot->running) {
		if (ssiot->connected == false || list_empty(&ssiot->pkg_list)) {
			libnotify_wait(&ssiot->notify, 0);
			continue;
		}
	
		pthread_mutex_lock(&ssiot->pkg_mutex);
		pkg = (ssiot_pkg_t *) container_of(ssiot->pkg_list.next, ssiot_pkg_t, list);
		ssiot->cache_size -= (sizeof(ssiot_pkg_t) + pkg->size);
		list_del(&pkg->list);
		pthread_mutex_unlock(&ssiot->pkg_mutex);
		if (pkg == NULL)
			break;
		
		hmqtt_publish_qos0(&ssiot->mqtt, pkg->topic, pkg->data, pkg->size);
		free(pkg);
	}
	pthread_mutex_lock(&ssiot->pkg_mutex);
	while (!list_empty(&ssiot->pkg_list)) {
		list_del(&pkg->list);
		free(pkg);
	}
	pthread_mutex_unlock(&ssiot->pkg_mutex);
	LOG_I("%s: data thread exit", ssiot->mqtt.name);

	return NULL;
}


int ssiot_publish(ssiot_t *ssiot, const char *topic, void *data, int len)
{
	ssiot_pkg_t *pkg;
	int size;

	if (ssiot == NULL)
		ssiot = g_ssiot;
	if (ssiot == NULL || topic == NULL)
		return -EINVAL;
	if (data == NULL)
		len = 0;
	
	pthread_mutex_lock(&ssiot->pkg_mutex);
	size = sizeof(ssiot_pkg_t) + len;
	if (ssiot->cache_size + size > ssiot->cache_max) {
		pthread_mutex_unlock(&ssiot->pkg_mutex);
		return -EBUSY;
	}
	pkg = malloc(sizeof(ssiot_pkg_t) + len + 1);
	if (pkg == NULL) {
		pthread_mutex_unlock(&ssiot->pkg_mutex);
		return -ENOMEM;
	}
	if (topic[0] == '/')
		strlcpy(pkg->topic, topic, sizeof(pkg->topic));
	else
		snprintf(pkg->topic, sizeof(pkg->topic), "/iot/$/%s/%s", ssiot->devsn, topic);
	pkg->size = len;
	if (data && len > 0) {
		memcpy(pkg->data, data, len);
		pkg->data[len] = '\0';
	}
	list_add_tail(&pkg->list, &ssiot->pkg_list);
	pthread_mutex_unlock(&ssiot->pkg_mutex);

	libnotify_wakeup(&ssiot->notify);
	
	return 0;
}


int ssiot_send_data(ssiot_t *ssiot, const char *from, int sid, char *type, cJSON *data)
{
	cJSON *obj;
	char *send;
	char topic[128] = {0};
	int ret;

	if (ssiot == NULL)
		ssiot = g_ssiot;
	if (ssiot == NULL || data == NULL)
		return -EINVAL;

	obj = cJSON_CreateObject();
	if (obj == NULL)
		return -ENOMEM;
	if (from)
		cJSON_AddStringToObject(obj, "from", from);
	if (sid >= 0)
		cJSON_AddNumberToObject(obj, "sid", sid);
	cJSON_AddItemToObject(obj, "data", data);
	
	send = cJSON_PrintUnformatted(obj);
	cJSON_DetachItemFromObject(obj, "data");
	cJSON_Delete(obj);
	if (send == NULL)
		return -ENOMEM;

	snprintf(topic, sizeof(topic), "/iot/$/%s/%s", ssiot->devsn, type);
	ret = ssiot_publish(ssiot, topic, send, strlen(send));
	free(send);

	return ret;
}


ssiot_t *ssiot_create(ssiot_param_t *param, void (*on_message)(ssiot_message_t *, void *), void *arg)
{
	ssiot_t *ssiot = NULL;
	int ret;
	pthread_t thread;

	if (param == NULL || param->devsn == NULL || param->product == NULL)
		return NULL;

	ssiot = calloc(1, sizeof(ssiot_t));
	if (ssiot == NULL)
		return NULL;
	if (g_ssiot == NULL)
		LOG_I("SSIOT SDK: %s", SSIOT_SDK_VERSION);
	LOG_I("model:%s, devsn:%s", param->product, param->devsn);
	strlcpy(ssiot->model, param->product, sizeof(ssiot->model));
	strlcpy(ssiot->devsn, param->devsn, sizeof(ssiot->devsn));
	if (param->key)
		ssiot->key = strdup(param->key);
	ssiot->on_message = on_message;
	ssiot->msg_arg = arg;

	libnotify_init(&ssiot->notify);
	ret = ssiot_param_init(param, &ssiot->param);
	if (ret < 0) {
		LOG_W("param invalid");
		goto FAIL;
	}

	ret = hmqtt_init(&ssiot->mqtt, on_ssiot_receive, ssiot);
	if (ret < 0)
		goto FAIL;

	ssiot->running = true;
	ret = pthread_create(&thread, NULL, mqtt_thread_main, ssiot);
	if (ret)
		goto FAIL;
	ssiot->mqtt_thread = thread;

	ssiot->cache_max = SSIOT_DATA_CACHE_SIZE;
	INIT_LIST_HEAD(&ssiot->pkg_list);
	pthread_mutex_init(&ssiot->pkg_mutex, NULL);
	ret = pthread_create(&thread, NULL, data_thread_main, ssiot);
	if (ret)
		goto FAIL;
	ssiot->data_thread = thread;

	if (g_ssiot == NULL)
		g_ssiot = ssiot;

	return ssiot;

FAIL:
	ssiot_destory(ssiot);

	return NULL;
}

int ssiot_destory(ssiot_t *ssiot)
{
	if (ssiot == NULL)
		ssiot = g_ssiot;
	if (ssiot == NULL)
		return -ENOENT;

	if (ssiot->running) {
		ssiot->running = false;
		hmqtt_disconnect(&ssiot->mqtt, "destory");
		libnotify_wakeup(&ssiot->notify);
	}

	if (ssiot->mqtt_thread)
		pthread_join(ssiot->mqtt_thread, NULL);

	if (ssiot->data_thread)
		pthread_join(ssiot->data_thread, NULL);

	hmqtt_uninit(&ssiot->mqtt);

	if (ssiot->key)
		free(ssiot->key);
	if (ssiot == g_ssiot)
		g_ssiot = NULL;
	free(ssiot);

	return 0;
}


int ssiot_reinit(ssiot_t *ssiot, ssiot_param_t *param)
{
	int ret;

	if (ssiot == NULL)
		ssiot = g_ssiot;
	if (ssiot == NULL || param == NULL)
		return -EINVAL;

	if (param->devsn)
		strlcpy(ssiot->model, param->product, sizeof(ssiot->model));
	if (param->product)
		strlcpy(ssiot->devsn, param->devsn, sizeof(ssiot->devsn));
	if (param->key) {
		if (ssiot->key)
			free(ssiot->key);
		ssiot->key = strdup(param->key);
	}

	ret = ssiot_param_init(param, &ssiot->param);
	if (ret < 0)
		return ret;

	ssiot->fails = 0;
	ssiot->retry = 0;
	ssiot->next_time = 0;
	hmqtt_disconnect(&ssiot->mqtt, "reinit");

	return 0;
}
